package com.baomidou.springboot.spark;

import com.baomidou.springboot.constant.Constants;
import com.baomidou.springboot.utils.StringUtils;
import org.apache.spark.AccumulatorParam;

/**
 * Created by pcmmm on 17/6/9.
 */
public class CityAccumulator implements AccumulatorParam<String> {
    /**
     * zero方法，其实主要用于数据的初始化
     * 那么，我们这里，就返回一个值，就是初始化中，所有范围区间的数量，都是0
     * 各个范围区间的统计数量的拼接，还是采用一如既往的key=value|key=value的连接串的格式
     */
    @Override
    public String zero(String v) {
        return Constants.CITY_BEIJING + "=0|"
                + Constants.CITY_SHANGHAI + "=0|"
                + Constants.CITY_GUANGZHOU + "=0|"
                + Constants.CITY_SHENZHEN + "=0|"
                + Constants.CITY_NANJING + "=0|"
                + Constants.CITY_HANGZHOU +  "=0";
    }
    @Override
    public String addInPlace(String v1, String v2) {
        return add(v1, v2);
    }

    @Override
    public String addAccumulator(String v1, String v2) {
        return add(v1, v2);
    }

    private String add(String v1, String v2) {
        // 校验：v1为空的话，直接返回v2
        if(StringUtils.isEmpty(v1)) {
            return v2;
        }

        // 使用StringUtils工具类，从v1中，提取v2对应的值，并累加1
        String oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2);
        if(oldValue != null) {
            // 将范围区间原有的值，累加1
            int newValue = Integer.valueOf(oldValue) + 1;
            // 使用StringUtils工具类，将v1中，v2对应的值，设置成新的累加后的值
            return StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue));
        }

        return v1;
    }
}
